Guava RateLimiter

参考 RateLimiter 代码。

基本使用

1
2
3
4
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
1
2
RateLimiter rateLimiter = RateLimiter.create(1);
rateLimiter.acquire();

代码实现

抽取 SmoothBursty 限流器的关键代码,梳理基本的实现流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class RateLimiter {

/**
* 最多可以存储多少秒的许可来应对突发流量
*/
private final double maxBurstSeconds;

/**
* 允许存储的最大许可数量
*/
private final double maxPermits;

/**
* 生成许可的间隔时间
*/
private final double stableIntervalMicros;

/**
* 当前存储的许可数量
*/
private double storedPermits;

/**
* 允许下一个请求获取许可的时间
*/
private long nextFreeTicketMicros = 0L;

/**
* 每秒生成的许可数量
*/
RateLimiter(double permitsPerSecond) {
this.maxBurstSeconds = 1.0;
maxPermits = maxBurstSeconds * permitsPerSecond;
stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
}

/**
* 获取指定数量的许可,阻塞请求知道可以获取,返回等待的时间(秒)
*/
public double acquire(int permits) {
// 参数范围检查
if (permits <= 0) {
throw new IllegalArgumentException(String.format("Requested permits (%s) must be positive", permits));
}
long microsToWait;
// 加锁保证线程安全
synchronized (this) {
Instant instant = Instant.now();
long nowMicros = instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1000;
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
microsToWait = max(momentAvailable - nowMicros, 0);
}
LockSupport.parkNanos(microsToWait * 1000);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

/**
* 返回当前请求允许获取许可的时间,设置当前存储的许可数量,以及下一个请求允许获取许可的时间
*/
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 如果当前时间大于 nextFreeTicket,则重新生成当前时间存储的许可数量,以及当前请求允许获取许可的时间
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / stableIntervalMicros;
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
// 存储当前请求允许获取许可的时间作为返回值
long returnValue = nextFreeTicketMicros;
// 消耗已有的许可数量,然后根据需要新获取的许可数量,生成下一次请求的等待时间
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros = (long) (freshPermits * stableIntervalMicros);
// 将等待时间添加到 nextFreeTicketMicros 中,如果溢出则设置为 Long.MAX_VALUE
try {
nextFreeTicketMicros = Math.addExact(nextFreeTicketMicros, waitMicros);
} catch (ArithmeticException e) {
nextFreeTicketMicros = Long.MAX_VALUE;
}
storedPermits -= storedPermitsToSpend;
return returnValue;
}

public static void main(String[] args) {
RateLimiter rateLimiter = new RateLimiter(1);
for (int i = 0; i < 10; i++) {
double wait = rateLimiter.acquire(2);
System.out.printf("等待时间: %fs, 剩余许可: %f\n", wait, rateLimiter.storedPermits);
}
}
}
1
2
3
4
5
6
7
8
// 当前时间大于 nextFreeTicket,生成 maxPermits 个许可
// 请求 2 个许可,当前只有 1 个许可,等待 0 s,下一个请求的等待时间是 1 s
等待时间: 0.000000s, 剩余许可: 0.000000
// 请求 2 个许可,当前只有 0 个许可,等待 1 s,下一个请求的等待时间是 2 s
等待时间: 0.967138s, 剩余许可: 0.000000
// 请求 2 个许可,当前只有 0 个许可,等待 2 s,下一个请求的等待时间是 2 s
等待时间: 1.981604s, 剩余许可: 0.000000
等待时间: 1.989831s, 剩余许可: 0.000000
作者

Ligh0x74

发布于

2025-04-18

更新于

2025-04-18

许可协议

评论